package obsbackupstorage

import (
	"context"
	"encoding/json"
	"errors"
	"fmt"
	obs "github.com/huaweicloud/huaweicloud-sdk-go-obs/obs"
	"io"
	"net/http"
	"os"
	"sort"
	"strings"
	"sync"
	"vitess.io/vitess/go/vt/log"

	"github.com/spf13/pflag"

	"vitess.io/vitess/go/vt/concurrency"
	"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
	"vitess.io/vitess/go/vt/servenv"
)

var (
	// configFilePath is where the configs/credentials for backups will be stored.
	configFilePath string
)

func registerFlags(fs *pflag.FlagSet) {
	fs.StringVar(&configFilePath, "obs_backup_storage_config", "obs_backup_config.json",
		"Path to JSON config file for obs backup storage.")
}

func init() {
	servenv.OnParseFor("vtbackup", registerFlags)
	servenv.OnParseFor("vtctl", registerFlags)
	servenv.OnParseFor("vtctld", registerFlags)
	servenv.OnParseFor("vttablet", registerFlags)
}

var storageConfig struct {
	AccessKey     string `json:"accessKey"`
	SecretKey     string `json:"secretKey"`
	EndPoint      string `json:"endPoint"`
	SecurityToken string `json:"securityToken"`
}

// OBSBackupHandle implements BackupHandle for Ceph Cloud Storage.
type OBSBackupHandle struct {
	client    *obs.ObsClient
	bs        *OBSBackupStorage
	dir       string
	name      string
	readOnly  bool
	errors    concurrency.AllErrorRecorder
	waitGroup sync.WaitGroup
}

// RecordError is part of the concurrency.ErrorRecorder interface.
func (bh *OBSBackupHandle) RecordError(err error) {
	bh.errors.RecordError(err)
}

// HasErrors is part of the concurrency.ErrorRecorder interface.
func (bh *OBSBackupHandle) HasErrors() bool {
	return bh.errors.HasErrors()
}

// Error is part of the concurrency.ErrorRecorder interface.
func (bh *OBSBackupHandle) Error() error {
	return bh.errors.Error()
}

// Directory implements BackupHandle.
func (bh *OBSBackupHandle) Directory() string {
	return bh.dir
}

// Name implements BackupHandle.
func (bh *OBSBackupHandle) Name() string {
	return bh.name
}

// AddFile implements BackupHandle.
func (bh *OBSBackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) {
	if bh.readOnly {
		return nil, fmt.Errorf("AddFile cannot be called on read-only backup")
	}
	reader, writer := io.Pipe()
	bh.waitGroup.Add(1)
	go func() {
		defer bh.waitGroup.Done()

		// obs bucket name is where the backups will go
		//backup handle dir field contains keyspace/shard value
		bucket := alterBucketName(bh.dir)

		// Give PutObject() the read end of the pipe.
		object := objName(bh.dir, bh.name, filename)

		input := &obs.PutObjectInput{}
		input.Bucket = bucket
		input.Key = object

		// If filesize is unknown, the caller should pass in -1 and we will pass it through.
		_, err := bh.client.PutObject(input)
		if err != nil {
			// Signal the writer that an error occurred, in case it's not done writing yet.
			reader.CloseWithError(err)
			// In case the error happened after the writer finished, we need to remember it.
			bh.RecordError(err)
		}
	}()
	// Give our caller the write end of the pipe.
	return writer, nil
}

// EndBackup implements BackupHandle.
func (bh *OBSBackupHandle) EndBackup(ctx context.Context) error {
	if bh.readOnly {
		return fmt.Errorf("EndBackup cannot be called on read-only backup")
	}
	bh.waitGroup.Wait()
	// Return the saved PutObject() errors, if any.
	return bh.Error()
}

// AbortBackup implements BackupHandle.
func (bh *OBSBackupHandle) AbortBackup(ctx context.Context) error {
	if bh.readOnly {
		return fmt.Errorf("AbortBackup cannot be called on read-only backup")
	}
	return bh.bs.RemoveBackup(ctx, bh.dir, bh.name)
}

// ReadFile implements BackupHandle.
func (bh *OBSBackupHandle) ReadFile(ctx context.Context, filename string) (io.ReadCloser, error) {
	//if !bh.readOnly {
	//	return nil, fmt.Errorf("ReadFile cannot be called on read-write backup")
	//}
	//// ceph bucket name
	//bucket := alterBucketName(bh.dir)
	//object := objName(bh.dir, bh.name, filename)
	//return bh.client.GetObjectWithContext(ctx, bucket, object, minio.GetObjectOptions{})
	return nil, nil
}

// OBSBackupStorage implements BackupStorage for Ceph Cloud Storage.
type OBSBackupStorage struct {
	// client is the instance of the Ceph Cloud Storage Go client.
	// Once this field is set, it must not be written again/unset to nil.
	_client *obs.ObsClient
	// mu guards all fields.
	mu sync.Mutex
}

// ListBackups implements BackupStorage.
func (bs *OBSBackupStorage) ListBackups(ctx context.Context, dir string) ([]backupstorage.BackupHandle, error) {
	c, err := bs.client()
	if err != nil {
		return nil, err
	}
	// obs bucket name
	bucket := alterBucketName(dir)
	input := &obs.ListObjectsInput{}
	input.Bucket = bucket

	// List prefixes that begin with dir (i.e. list subdirs).
	var subdirs []string
	searchPrefix := objName(dir, "")

	input.Prefix = searchPrefix

	output, err := c.ListObjects(input)
	if err != nil {
		_, err := c.HeadBucket(bucket)
		if err != nil {
			return nil, err
		}
		return nil, err
	}
	fmt.Printf("List objects under the bucket(%s) successful!\n", input.Bucket)
	fmt.Printf("RequestId:%s\n", output.RequestId)
	for index, val := range output.Contents {
		fmt.Printf("Content[%d]-OwnerId:%s, ETag:%s, Key:%s, LastModified:%s, Size:%d\n",
			index, val.Owner.ID, val.ETag, val.Key, val.LastModified, val.Size)
		subdir := strings.TrimPrefix(val.Key, searchPrefix)
		subdir = strings.TrimSuffix(subdir, "/")
		subdirs = append(subdirs, subdir)
	}

	//Backups must be returned in order, oldest first.
	sort.Strings(subdirs)

	result := make([]backupstorage.BackupHandle, 0, len(subdirs))
	for _, subdir := range subdirs {
		result = append(result, &OBSBackupHandle{
			client:   c,
			bs:       bs,
			dir:      dir,
			name:     subdir,
			readOnly: true,
		})
	}
	return result, nil
}

// StartBackup implements BackupStorage.
func (bs *OBSBackupStorage) StartBackup(ctx context.Context, dir, name string) (backupstorage.BackupHandle, error) {
	c, err := bs.client()
	if err != nil {
		return nil, err
	}
	// obs bucket name
	bucket := alterBucketName(dir)

	output, err := c.HeadBucket(bucket)

	if err != nil {
		log.Info("Error from BucketExists: %v, quitting", bucket)
		return nil, errors.New("Error checking whether bucket exists: " + bucket)
	}
	if output.StatusCode != http.StatusOK {
		log.Info("Bucket: %v doesn't exist, creating new bucket with the required name", bucket)
		input := &obs.CreateBucketInput{}
		input.Bucket = bucket
		_, err := c.CreateBucket(input)
		if err != nil {
			log.Info("Error creating Bucket: %v, quitting", bucket)
			return nil, errors.New("Error creating new bucket: " + bucket)
		}
	}

	return &OBSBackupHandle{
		client:   c,
		bs:       bs,
		dir:      dir,
		name:     name,
		readOnly: false,
	}, nil
}

// RemoveBackup implements BackupStorage.
func (bs *OBSBackupStorage) RemoveBackup(ctx context.Context, dir, name string) error {
	//c, err := bs.client()
	//if err != nil {
	//	return err
	//}
	//// ceph bucket name
	//bucket := alterBucketName(dir)
	//
	//fullName := objName(dir, name, "")
	//var arr []string
	//doneCh := make(chan struct{})
	//defer close(doneCh)
	//for object := range c.ListObjects(bucket, fullName, true, doneCh) {
	//	if object.Err != nil {
	//		return object.Err
	//	}
	//	arr = append(arr, object.Key)
	//}
	//for _, obj := range arr {
	//	err = c.RemoveObject(bucket, obj)
	//	if err != nil {
	//		return err
	//	}
	//}
	return nil
}

// Close implements BackupStorage.
func (bs *OBSBackupStorage) Close() error {
	bs.mu.Lock()
	defer bs.mu.Unlock()

	if bs._client != nil {
		// a new client the next time one is needed.
		bs._client = nil
	}
	return nil
}

func (bs *OBSBackupStorage) WithParams(params backupstorage.Params) backupstorage.BackupStorage {
	// TODO(maxeng): return a new CephBackupStorage that uses params.
	return bs
}

// client returns the Ceph Storage client instance.
// If there isn't one yet, it tries to create one.
func (bs *OBSBackupStorage) client() (*obs.ObsClient, error) {
	bs.mu.Lock()
	defer bs.mu.Unlock()

	if bs._client == nil {
		configFile, err := os.Open(configFilePath)
		if err != nil {
			return nil, fmt.Errorf("file not present : %v", err)
		}
		defer configFile.Close()
		jsonParser := json.NewDecoder(configFile)
		if err = jsonParser.Decode(&storageConfig); err != nil {
			return nil, fmt.Errorf("error parsing the json file : %v", err)
		}

		accessKey := storageConfig.AccessKey
		secretKey := storageConfig.SecretKey
		url := storageConfig.EndPoint
		//token := storageConfig.SecurityToken

		client, err := obs.New(accessKey, secretKey, url)
		if err != nil {
			return nil, err
		}
		bs._client = client
	}
	return bs._client, nil
}

func init() {
	backupstorage.BackupStorageMap["obs"] = &OBSBackupStorage{}
}

// objName joins path parts into an object name.
// Unlike path.Join, it doesn't collapse ".." or strip trailing slashes.
func objName(parts ...string) string {
	return strings.Join(parts, "/")
}

// keeping in view the bucket naming conventions for ceph
// only keyspace informations is extracted and used for bucket name
func alterBucketName(dir string) string {
	bucket := strings.ToLower(dir)
	bucket = strings.Split(bucket, "/")[0]
	bucket = strings.Replace(bucket, "_", "-", -1)
	return bucket
}
